package com.mti.service.impl;

import com.mti.exception.BusinessException;
import com.mti.service.IWebsocketService;
import com.mti.vo.Message;
import com.mti.vo.MessageText;
import com.mti.websocket.SocketSessionRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;

@Slf4j
@Service
public class WebsocketServiceImpl implements IWebsocketService {

    @Autowired
    private SocketSessionRegistry sessionRegistry;

    @Override
    public void sendMessage(final Message message) {
        Optional.ofNullable(message).orElseThrow(() -> new BusinessException(500,"消息不能为空"));
        List<String> targets = Optional.ofNullable(message.getTarget()).orElseThrow(() -> new BusinessException(500,"消息接收方不能为空"));
        Collection<WebSocketSession> sessions = sessionRegistry.getSessionByUsers(targets);
        for (WebSocketSession session : sessions) {
            session.send(Flux.just(session.binaryMessage(f -> f.wrap(message.getContent())))).then().toProcessor();
        }
    }

    /**
     * 发送消息（Text）
     * @param message 消息实体
     */
    @Override
    public void sendMessageText(final MessageText message) {
        Optional.ofNullable(message).orElseThrow(() -> new BusinessException(500,"消息不能为空"));
        List<String> targets = Optional.ofNullable(message.getTarget()).orElseThrow(() -> new BusinessException(500,"消息接收方不能为空"));
        Collection<WebSocketSession> sessions = sessionRegistry.getSessionByUsers(targets);

        sessionRegistry.getAllSessionWebSocketInfos();
        for (WebSocketSession session : sessions) {
            String content = message.getContent();
            log.info("收到文本消息:{}",content);
            session.send(Flux.just(session.textMessage(content))).then().toProcessor();
        }
    }

    /**
     * 发送消息（Text）
     * @param message 消息实体
     */
    @Override
    public void sendMessageTextToAll(final MessageText message) {
        Optional.ofNullable(message).orElseThrow(() -> new BusinessException(500,"消息不能为空"));
        // 获取用户session
        ConcurrentMap<String, WebSocketSession> sessions =  sessionRegistry.getAllSessionWebSocketInfos();
        for(Map.Entry<String, WebSocketSession> entry: sessions.entrySet() ){
            WebSocketSession session = entry.getValue();
            String content = message.getContent();
            log.info("给所有人发消息,entry.getKey:{},entry.getValue:{},apiResponse:{}",entry.getKey(), entry.getValue(),content);
            session.send(Flux.just(session.textMessage(content))).then().toProcessor();
        }
    }

    /**
     * 发送消息（Text）
     * @param message 消息实体
     */
    @Override
    public void sendMessageTextToSome(final MessageText message) {
        Optional.ofNullable(message).orElseThrow(() -> new BusinessException(500,"消息不能为空"));
        // 获取用户session
        ConcurrentMap<String, WebSocketSession> sessions =  sessionRegistry.getPcSessionWebSocket();
        for(Map.Entry<String, WebSocketSession> entry: sessions.entrySet() ){
            WebSocketSession session = entry.getValue();
            String content = message.getContent();
            log.info("给所有人发消息,entry.getKey:{},entry.getValue:{},apiResponse:{}",entry.getKey(), entry.getValue(),content);
            session.send(Flux.just(session.textMessage(content))).then().toProcessor();
        }
    }

    /**
     * 发送消息
     *
     * @param messageText 消息实体
     * @param mjidList    民警ID
     */
    @Override
    public void sendMessage2AppMj(final MessageText messageText, List<String> mjidList) {
        Optional.ofNullable(messageText).orElseThrow(() -> new BusinessException(500,"消息不能为空"));
        Collection<WebSocketSession> sessions = sessionRegistry.getSessionByUsers(mjidList);
        for (WebSocketSession session : sessions) {
            System.out.println("WebsocketServiceImpl::sendMessage2Mj - mjidList: " + mjidList.toString() + ", sessions: " + sessions.toString());
            session.send(Flux.just(session.textMessage(messageText.getContent()))).then().toProcessor();
        }
    }
}
